Skip to content

[SPARK-57000][CORE][SS][RTM] Add concurrent scheduling capabilites for Real-time Mode#56055

Open
jerrypeng wants to merge 5 commits into
apache:masterfrom
jerrypeng:oss-concurrent-stage-scheduler
Open

[SPARK-57000][CORE][SS][RTM] Add concurrent scheduling capabilites for Real-time Mode#56055
jerrypeng wants to merge 5 commits into
apache:masterfrom
jerrypeng:oss-concurrent-stage-scheduler

Conversation

@jerrypeng

@jerrypeng jerrypeng commented May 22, 2026

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

This PR introduces ConcurrentStageDAGScheduler, the scheduler needed to power real-time mode for Structured Streaming.

In real-time mode, a streaming query continuously produces output with end-to-end latency on the order of tens of milliseconds — far below the latency floor of traditional micro-batch
execution. To get there, the query has to abandon the "run stage N, materialize its shuffle output, then run stage N+1" model that the default DAGScheduler enforces. Instead, every
stage of the query must run at the same time, with records flowing from upstream tasks to downstream tasks through a streaming shuffle as they're produced.

ConcurrentStageDAGScheduler is the scheduling half of that design. Concretely, it:

  • Adds a new opt-in DAGScheduler implementation, ConcurrentStageDAGScheduler, selected via spark.scheduler.dagSchedulerType=ConcurrentStageDAGScheduler and engaged per-job via the
    streaming.concurrent.stages.enabled=true property.
  • Walks the stage DAG on job submission, marks all stages reachable from the final stage as concurrent, and validates that the cluster has enough free slots to run them all simultaneously
    (gated by spark.scheduler.realtimeModeSlotsCheck.disabled, with a new CONCURRENT_SCHEDULER_INSUFFICIENT_SLOT error class for the failure path). The DAG walk accumulates into a local
    set and only commits to scheduler state after the slot check passes, so a failed submission can't leak partial state.
  • Submits child stages while their parents are still running, and defers a child stage's task-completion events until every concurrent parent has actually finished — preserving the
    invariant that DAGScheduler only sees "all of a stage's parents are done" task completions, even though tasks are physically running concurrently.
  • Adds the smallest extension points to DAGScheduler to make this possible: one empty onFinalStageCreated hook, two package-private accessors (isRunningStage, getStage), and
    relaxes submitStage, markStageAsFinished, submitMissingTasks, and activeJobForStage from private to protected. All hooks are no-ops for the default scheduler.
  • Wires real-time-mode-aware behavior into TaskSchedulerImpl (TaskSets with concurrent stages get maxTaskFailures=1, since a streaming task failure must restart the query rather than
    silently retry against a still-running shuffle) and TaskSetManager (ExecutorLostFailure counts toward maxTaskFailures instead of being exempted, so executor loss propagates as a
    query failure rather than a silent stall).
  • Both speculation guards — per-job local property and cluster-wide sc.conf.get(SPECULATION_ENABLED) — reject concurrent-stage jobs with speculation enabled, matching how the rest of
    core reads the config.
  • Refactors DAGSchedulerSuite into an abstract DAGSchedulerSuiteBase + TestDAGScheduler trait so the new suite can reuse the existing scheduler test harness without duplicating it.

Why are the changes needed?

Real-time mode is the only execution model in which a Structured Streaming query can deliver sub-100ms end-to-end latency, and concurrent stage scheduling is a hard prerequisite for it.
Here's why the default scheduler can't deliver that on its own:

Sequential stage execution is the latency floor for streaming. The default DAGScheduler waits for stage N to complete — every task done, every byte written to shuffle storage, every
map output registered with the MapOutputTracker — before submitting stage N+1. For a typical streaming query with a source, a stateful operator, and a sink, that means each
micro-batch's latency is the sum of each stage's processing time plus the sum of each shuffle's serialization/deserialization cost. Even with small per-stage costs, the sum dominates
as queries get more complex, and there's no architectural way to reduce it within the existing scheduler.

Real-time mode pipes data between stages via a streaming shuffle, not a materialized one. Downstream tasks subscribe to upstream tasks' output as it's produced — there's no "stage N
is done, here are the map outputs" handoff. For that to work, all stages of the job must be running simultaneously when records start flowing. If stage N+1 isn't running yet, stage N has
no consumer for the records it produces and either drops them or blocks. So "schedule all stages concurrently" isn't an optimization for real-time mode — it's a correctness requirement of
the streaming shuffle.

Failure semantics also have to change. In batch mode, a task failure caused by an executor crash is exempted from the failure count because the executor's loss isn't the task's fault
and the framework can re-run the task elsewhere. In real-time mode that exemption is wrong: the streaming shuffle has in-flight records that can't be reconstructed, so an executor loss
must fail the query and let it restart from a checkpoint. Similarly, retrying a single task against a streaming shuffle that's already partially consumed would corrupt state — so
concurrent-stage TaskSets are capped at maxFailures=1.

The default scheduler must stay untouched for batch. Real-time mode is opt-in and additive — the cluster still needs to run batch and non-real-time streaming jobs with their existing
semantics. Hence the scheduler-type config, the per-job opt-in property, and the empty-by-default onFinalStageCreated hook on the base DAGScheduler: when nothing opts in, nothing
changes.

Does this PR introduce any user-facing change?

No user-facing behavior change for any existing workload. Without setting the new config, SparkContext builds the same DAGScheduler it always has, and the default scheduler's
behavior is unchanged.

The PR does introduce two new internal configs (both internal(), so not part of the public surface):

  • spark.scheduler.dagSchedulerType — chooses the DAGScheduler implementation. Defaults to "DAGScheduler".
  • spark.scheduler.realtimeModeSlotsCheck.disabled — skips the slot-availability check used by the concurrent scheduler. Defaults to false.

And one new error class:

  • CONCURRENT_SCHEDULER_INSUFFICIENT_SLOT — thrown by the concurrent scheduler when a streaming job needs more concurrent slots than the cluster offers.

How was this patch tested?

Added one new test suite plus targeted regression tests in the existing TaskScheduler suites:

  1. ConcurrentStageDAGSchedulerSuite — exercises the new scheduler end-to-end through the existing DAGSchedulerSuiteBase test harness. Tests cover:

    • Happy paths. Simple two-stage concurrent job (both stages enter runningStages on submission; the child's task completions are buffered until the parent finishes). Complex
      six-stage DAG with diamond dependencies (verifies parent-tracking, deferred-event buffering, and correct release order when parents finish out of order). Concurrent stages disabled in
      properties (scheduler falls back to default sequential behavior).
    • Failure-path cleanup. Slot-check failure leaves internal state empty (no leak of partially-visited stages). Stage abort with a shared parent stage cleans up dependentStageMap
      (the parent isn't marked finished by the cascade, so the cleanup at the end of markStageAsFinished is the only path that can release the entry). Job cancellation cleans up both
      concurrentStages and dependentStageMap. Executor-loss-induced abort (via the maxFailures=1 path) cleans up both maps.
    • Speculation rejection in two variants: per-job local property and cluster-wide SparkConf. Both cause the job to fail on submission with a clear error.
    • An extraEmptyChecks hook on assertDataStructuresEmpty is overridden to assert both concurrentStages and dependentStageMap are empty, and is called from afterEach, so
      every locally-defined test and every inherited test from DAGSchedulerSuiteBase (149 of them) automatically validates the new state invariants. Total: 155 tests pass in this suite.

    By inheriting from DAGSchedulerSuiteBase, the suite also runs all 149 existing DAGScheduler tests against ConcurrentStageDAGScheduler — free regression coverage that the new
    scheduler behaves identically to DAGScheduler when concurrent mode is not engaged.

  2. TaskSchedulerImplSuite — one new test: a TaskSet with streaming.concurrent.stages.enabled=true is submitted with maxTaskFailures=1 regardless of spark.task.maxFailures; a
    regular TaskSet still gets the cluster default. Regression-guards both branches of the new conditional.

  3. TaskSetManagerSuite — two new tests covering the new failure-counting behavior:

    • With concurrent stages enabled, an ExecutorLostFailure with exitCausedByApp=false counts toward maxTaskFailures (the query restarts rather than silently absorbing executor
      loss).
    • Without concurrent stages, the same failure does not count — regression guard for the default behavior.

Full run: core/testOnly *DAGSchedulerSuite *ConcurrentStageDAGSchedulerSuite *TaskSetManagerSuite *TaskSchedulerImplSuite489 tests, all pass.

Was this patch authored or co-authored using generative AI tooling?

Co-authored with Claude Code (Claude Opus 4.7)

@jerrypeng jerrypeng changed the title [SPARK-XXXXX][CORE] Add ConcurrentStageDAGScheduler for low-latency s… [SPARK-57000][CORE][SS][RTM] Add concurrent scheduling capabilites for Real-time Mode May 22, 2026
…treaming

Ports the ConcurrentStageDAGScheduler from the Databricks runtime so that
streaming queries can opt in to a "real-time" execution mode that runs all
stages of a job concurrently rather than sequentially.

When enabled via spark.scheduler.dagSchedulerType=ConcurrentStageDAGScheduler
and the per-job streaming.concurrent.stages.enabled property, the scheduler:

- Marks all ancestor stages of the final stage as concurrent on job submission
  and validates that the cluster has enough free slots
  (CONCURRENT_SCHEDULER_INSUFFICIENT_SLOT), gated by
  spark.scheduler.realtimeModeSlotsCheck.disabled.
- Submits child stages while parents are still running, delays task completion
  events for a child whose parent is still running, and replays the delayed
  events when the parent finishes.
- Rejects speculative execution.

DAGScheduler changes (no-op for the default scheduler):
- New protected onFinalStageCreated hook, invoked from handleJobSubmitted /
  handleMapStageSubmitted right after final stage creation.
- New protected submitConcurrentStage and postSchedulerEvent helpers.
- New package-visible isRunningStage and getStage accessors.
- submitStage and markStageAsFinished relaxed from private to protected so
  subclasses can override them.

DAGSchedulerSuite refactor:
- Renames the concrete suite to abstract DAGSchedulerSuiteBase and adds an
  empty class DAGSchedulerSuite extends DAGSchedulerSuiteBase to preserve
  the existing entry point.
- Extracts a TestDAGScheduler trait carrying the scheduleShuffleMergeFinalize
  and handleTaskCompletion overrides; MyDAGScheduler mixes the trait in.
- Adds a protected createInitialScheduler hook used by init().
- Loosens submit, completeShuffleMapStageSuccessfully,
  completeNextResultStageWithSuccess, and assertDataStructuresEmpty to
  protected so subclass suites can use them.

Integration:
- SparkContext picks the scheduler implementation based on
  spark.scheduler.dagSchedulerType.
- TaskSchedulerImpl uses maxFailures=1 for concurrent-stage TaskSets so a
  failure restarts the streaming query instead of being silently retried.
- TaskSetManager counts ExecutorLostFailure toward task failures and skips
  the "executor lost is not the task's fault" exemption in concurrent mode.

Adds the supporting LogKeys (PARENT_STAGE, STREAMING_QUERY_ID) and the
CONCURRENT_SCHEDULER_INSUFFICIENT_SLOT error class.

Deviations from the runtime source kept to the minimum necessary to compile
in OSS:
- Extends DAGScheduler directly (runtime extends CrossJobDepDAGScheduler,
  which gates micro-batch pipelining; not part of OSS).
- Hook is named onFinalStageCreated rather than the runtime's
  populateCrossJobDepInfo, since CrossJobDepDAGScheduler is not part of OSS.
- Micro-batch pipelining co-existence check (and its test) dropped, since
  MBP is not part of OSS.
- getStreamingBatchIdFromProperties and StreamingBatchId live in the
  companion object instead of CrossJobDepDAGScheduler.
- Slot check uses sc.schedulerBackend.defaultParallelism() in place of
  the runtime's TaskSchedulerStats helper.
- DatabricksEdgeConfigs.serverlessEnabled gating removed; the
  spark.scheduler.realtimeModeSlotsCheck.disabled config is the sole knob.
- isConcurrentStagesEnabled tolerates null Properties (OSS TaskSet allows
  null in tests).

Co-authored-by: Isaac
@jerrypeng jerrypeng force-pushed the oss-concurrent-stage-scheduler branch from 3d0058f to e2a204b Compare May 22, 2026 17:30
Comment thread core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala Outdated
Comment thread core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala Outdated
Comment thread core/src/main/scala/org/apache/spark/scheduler/ConcurrentStageDAGScheduler.scala Outdated
val totalSlots = sc.schedulerBackend.defaultParallelism()
val coresInUse = runningStages.toArray.map(totalNumCoreForStage(_)).sum
if (totalSlots - coresInUse < totalCoresNeeded) {
throw new SparkRuntimeException(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When this throws, the stages added to concurrentStages above are leaked — handleJobSubmitted catches the exception and fails the job, but nothing ever clears those entries. A subsequent job whose stages share IDs (e.g. retries from the same RDDChain) would inherit them. Either clear concurrentStages of the stages just visited before throwing, or capture them in a local set and only commit to concurrentStages once the slot check passes.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed by accumulating into a local visitedStages set during the DAG walk and only committing to concurrentStages after the slot check passes

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though the actual affect of this would likely be small since this would only occur on query failure.

}

// This is overridden to handle any delayed task completion events for dependent stages.
override def markStageAsFinished(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dependentStageMap cleanup path only fires when a stage in the map is named as a parent via markStageAsFinished(parent). If a dependent stage itself aborts mid-job (e.g. its single allowed failure under maxTaskFailures=1), its own entry — including any buffered delayedTaskCompletionEvents — is never removed from dependentStageMap. With concurrent jobs sharing a long-lived scheduler instance, that's a slow leak across queries. Consider clearing the entry for stage itself inside markStageAsFinished (especially when errorMessage.isDefined).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will just remove the stage's own entry at the end of markStageAsFinished

Comment thread common/utils/src/main/resources/error/error-conditions.json Outdated
STREAMING_DATA_SOURCE_NAME,
STREAMING_OFFSETS_END,
STREAMING_OFFSETS_START,
STREAMING_QUERY_ID,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QUERY_ID already exists and is what StructuredStreamingIdAwareSchedulerLogging uses to log streaming query IDs. Adding STREAMING_QUERY_ID creates a parallel key for the same concept. Suggest dropping this addition and using LogKeys.QUERY_ID at all the callsites, or update the callsites in StructuredStreamingIdAwareSchedulerLogging.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A query id and streaming query id are typically not the same. query id for a batch query is simply a transient id for a batch. The streaming query id is persistent for the entirety of the streaming query execution.

I would keep it here and fix it in StructuredStreamingIdAwareSchedulerLogging

Comment thread core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala Outdated
@jiangxb1987 jiangxb1987 requested a review from Ngone51 May 27, 2026 05:07
Fixes:
- DAGScheduler.submitConcurrentStage: change `new IllegalStateException(...)` to
  `throw new IllegalStateException(...)` so the unexpected-state branch actually
  fails instead of silently being a no-op.
- ConcurrentStageDAGScheduler.onFinalStageCreated: walk the DAG into a local
  `visitedStages` set and only commit to `concurrentStages` after the slot check
  passes, so a slot-check failure can't leak stage references into the long-lived
  scheduler state.
- ConcurrentStageDAGScheduler.markStageAsFinished: unconditionally drop the
  stage's own entry from `dependentStageMap` at the end. On the success path the
  entry has already been removed by `checkDependentStageTasks`, so this is a
  no-op; on failure/cancellation/abort it's the missing cleanup that previously
  required the parent stage to be marked finished (which doesn't always happen
  if the parent is shared with another job).
- ConcurrentStageDAGScheduler.onFinalStageCreated: speculation check also reads
  `sc.conf.get(SPECULATION_ENABLED)`, matching how the rest of core reads the
  config; users with cluster-wide spark.speculation=true were previously
  bypassing this guard.

API cleanup:
- Move `submitConcurrentStage` into ConcurrentStageDAGScheduler as a private
  method. Remove `postSchedulerEvent` entirely (callers now use
  `eventProcessLoop.post(event)` directly since it's already `private[spark]`).
  Relax `submitMissingTasks` and `activeJobForStage` to `protected` so the
  subclass can call them.
- Reuse `StructuredStreamingIdAwareSchedulerLogging.QUERY_ID_KEY` and
  `BATCH_ID_KEY` constants instead of hardcoded strings; drop the unused
  `runId` field from `StreamingBatchId` (CrossJobDepDAGScheduler — which
  consumes it — is not part of this PR).

Test scaffolding:
- Add `protected def extraEmptyChecks(): Unit = ()` hook to
  `assertDataStructuresEmpty` in DAGSchedulerSuiteBase; override in
  ConcurrentStageDAGSchedulerSuite to assert `concurrentStages` and
  `dependentStageMap` are empty.
- Also call `extraEmptyChecks()` in `afterEach`, so every inherited test (and
  every locally-defined test) automatically validates that the new state hasn't
  leaked. Pattern-match on the scheduler type to skip the check when an
  inherited test replaces the scheduler with a plain MyDAGScheduler.
- Relax `failed` and `cancel` to `protected` in DAGSchedulerSuiteBase so
  subclass suites can use them.

New tests (in ConcurrentStageDAGSchedulerSuite):
- `concurrentStages is empty after slot-check failure` — exercises the
  visited-set commit pattern.
- `dependentStageMap entry is cleaned up when a dependent stage aborts and its
  parent stage is shared with another job` — sets up a shared shuffle stage
  between a batch and a concurrent job; fails the concurrent job's leaf and
  verifies the cleanup runs even though the parent isn't marked finished.
- `concurrentStages and dependentStageMap are cleaned up after job
  cancellation` — covers the JobCancelled event path.
- `concurrentStages and dependentStageMap are cleaned up after executor-loss
  induced abort` — covers the maxFailures=1-abort path.
- Speculation test split into per-job-property and cluster-wide-SparkConf
  variants; both verified to fail the job.

Typos and wording:
- Comment "states" → "stages" in ConcurrentStageDAGScheduler.
- "has only has" → "has only" in the CONCURRENT_SCHEDULER_INSUFFICIENT_SLOT
  error message.
- "contribute the task failures" → "count toward the task failures" in
  TaskSetManager.
- Test comment "4 tasks in stage C" → "4 tasks in stage D" in the complex-
  pipeline test.

Co-authored-by: Isaac
@jerrypeng

Copy link
Copy Markdown
Contributor Author

@jiangxb1987 thank you for the review! I have addressed your comments and added additional tests. PTAL.

@jerrypeng jerrypeng requested a review from jiangxb1987 May 28, 2026 06:35
// enqueues any saved task completion event (if any).
private def checkDependentStageTasks(stage: Stage): Unit = {
val dependentStageInfo = dependentStageMap.getOrElse(
stage, throw new RuntimeException(s"Stage $stage is not in dependentStageMap")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be IllegalStateException instead of RuntimeException?

@mridulm mridulm left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this not modelled using barrier scheduling ?
IIRC I had raised this question in the SPIP as well

(Marking as requested change to prevent accidental merge)

@jerrypeng

Copy link
Copy Markdown
Contributor Author

Thanks @mridulm for the question, and for your interest in this work!

The short answer is that barrier execution mode and concurrent stage scheduling solve orthogonal problems. As I understand it, barrier mode is gang scheduling for the tasks within a single stage: it launches all N tasks of that stage simultaneously, and the tasks can then coordinate with each other mid-execution via barrier() / allGather() (MPI-style).

What real-time mode needs is different — the ability to schedule multiple stages of a job to run concurrently (which is what this PR focuses on), so records can stream from upstream stages to downstream stages through a streaming shuffle. There's no hard requirement for all tasks to coordinate, or to be co-scheduled, before the query starts.

Your question — whether RTM could benefit from gang scheduling — is a fair one. I think the answer is "maybe, but not strictly necessary." The streaming shuffle implements a backpressure mechanism that serves a similar purpose: if a downstream consumer isn't ready yet, the upstream producer backs off rather than failing, thus a coordinate execution system like barrier scheduling is not needed

@mridulm

mridulm commented Jun 2, 2026

Copy link
Copy Markdown
Contributor

From a scheduler perspective, it appears that the primitives required to make this work already exists - whether it gets leveraged for mpi or real time mode is an implementation detail.

Having said that, if we are robust to not requiring all stages/tasks to be running before execution (which was my previous understanding as per proposal - perhaps I misunderstood) - what is the gap w.r.t launching a bunch of long running map stages ?

Essentially, I am trying to understand why this needs to be in the scheduler - versus in integration code

To put it differently, I am trying to make sure we have robustly considered alternatives - and I want to understand their tradeoffs; they are not clear to me

@jerrypeng

Copy link
Copy Markdown
Contributor Author

@mridulm — This change is not needed when the streaming query is a single stage: a single long-running (map) stage runs fine on the existing scheduler, which is exactly why RTM support for single-stage stateless queries already shipped in 4.1 — no scheduler change required there.

However, multi-stage queries (e.g. stateful queries) are today executed one stage at a time, with each stage's shuffle output fully materialized before the next stage starts. To reach millisecond-level latencies, we instead need the stages of a single query to run concurrently, connected by the streaming shuffle (currently being merged in incrementally). Enabling concurrent execution of dependent stages within a single job is what requires the scheduler change — which is why this work is needed.

@mridulm

mridulm commented Jun 2, 2026

Copy link
Copy Markdown
Contributor

@jerrypeng you can launch N number of map stages - and wire them to talk to each other for multi stage queries ?
Whether concurrent execution is possible depends on resource availability.

@jerrypeng

Copy link
Copy Markdown
Contributor Author

@mridulm we can do that, but that is probably not the most eloquent or simplest solution.

A multi-stage query isn't a list of independent map stages — it's a connected stage DAG the planner already produces: shuffle-map stages at each exchange, a result stage at the sink, real shuffle-dependency edges between them, and branching where it exists (e.g. a join stage reads two shuffle inputs). To emulate that as "N map stages wired together" we'd have to re-cut the plan into separate jobs, pre-mint and inject shuffleIds to recreate the dependency edges, and re-implement cross-job failure/cancellation/completion so the micro-batch still behaves as one unit — i.e. re-derive the DAGScheduler's own stage decomposition and job coordination in streaming code, to fake a DAG we already have natively.

So instead of faking it, we keep the real plan and its real stages, and change the only thing that's actually different in real-time mode: when the stages run. Normally a stage waits for its parent to finish; here the stages run at the same time, connected by the streaming shuffle. That's a scheduling decision about an existing DAG, which is why it belongs in the scheduler.

@mridulm

mridulm commented Jun 3, 2026

Copy link
Copy Markdown
Contributor

@jerrypeng Everything you describe cam be entirely in the integration layer - instead of at the scheduler.
Note - I am not necessarily against the idea of doing this change : but the semantics of what is being proposed is not clear, or fleshed out IMO.

We are letting the implementation details for a usecase define what the scheduler should look like, and making surgical changes to adapt to it - instead of defining what the semantics need to be (perhaps this has been done : but it is unclear from this PR anyway).

@jerrypeng

Copy link
Copy Markdown
Contributor Author

@mridulm

Everything you describe cam be entirely in the integration layer - instead of at the scheduler

Can you elaborate?

but the semantics of what is being proposed is not clear, or fleshed out IMO

Sorry to hear that! Can you help me understand better what is not clear to you?

@mridulm

mridulm commented Jun 3, 2026

Copy link
Copy Markdown
Contributor

Can you elaborate?

For example, this integration could be modeled as I described above - submitting map stages with streaming shuffle wired up between the stages.
(I am not arguing it is ideal - but most likely possible)

Sorry to hear that! Can you help me understand better what is not clear to you?

The proposed scheduler is codifying expectations specific to this implementation - and not generic constructs.
For example, in the default DAGScheduler, there is a rationale for why a child waits for parent before it starts (this is different from MR, which has its own model)
Here, it is unclear why/.when it can start, how 'deep' can it start, etc - the decisions appear to be driven by RTF implementation details, and not robustly defined - making it less extensible to use for other usecases. As an example, barrier scheduling was designed for ML apps, but the constructs in scheduler are generic, and applicable to other usecases as well.

In other words, I want to make sure we make scheduler changes only when required, where the behavior is not implementation details in service of an initiative, and constructs help unlock a larger class of usecases.

@jerrypeng

Copy link
Copy Markdown
Contributor Author

@mridulm thank you for clarifying! Some background may help.

Real-time mode is a new execution mode we're introducing in Structured Streaming that lets streaming queries process data with end-to-end latencies in the milliseconds. Reaching that requires a few changes to how queries execute; scheduling is one of them, and that's what this PR covers. For a query with multiple stages to hit millisecond latencies, the cluster has to run the tasks of all stages at the same time, with adjacent stages connected by a streaming shuffle (implemented in a separate set of PRs). That lets data flow continuously through the query DAG instead of one stage at a time — and processing one stage at a time is a core reason the current model can't reach these latencies.

On the changes in this PR: the change to the existing DAG scheduler is small and additive — a no-op hook, a couple of accessors, and a few visibility relaxations — and the default behavior is unchanged. The new capability lives in a separate class (ConcurrentStageDAGScheduler), so the real-time-mode logic is isolated from the shared scheduler.

On the semantics you asked about: the new capability is that the stages of a DAG can run concurrently rather than one at a time. Today the DAG scheduler treats a data dependency between two stages as a reason to run them sequentially — the child waits for the parent. But that sequencing isn't inherent to the dependency; it's a consequence of the shuffle being materialized, where the consumer can't read anything until the producer has written its complete output. With a streaming shuffle that the consumer reads incrementally, the producer and consumer stages can run at the same time. So the semantic this introduces is: a directional data dependency constrains the ordering of data, not necessarily the concurrency of execution — and when the connecting shuffle supports incremental reads, dependent stages may execute concurrently. I hope that clarifies it.

This is a general scheduling capability — concurrent execution of data-dependent stages over an incrementally-readable shuffle — and real-time mode is simply its first consumer.

I think the question next should be how to natively integrate this into the DAGScheduler so users don't need to specify to configure to use the ConcurrentDAGscheduler to get this capability. That is what I will be working on next.

@jerrypeng

Copy link
Copy Markdown
Contributor Author

@mridulm do you have any additional concerns I can address?

@jerrypeng jerrypeng requested a review from mridulm June 5, 2026 19:05
@mridulm

mridulm commented Jun 6, 2026

Copy link
Copy Markdown
Contributor

I am not in favor of merging this PR.
This comment is directionally better aligned with how we should approach it.

Strawman proposal - extend support for realtime shuffle as a first class concept within DAGScheduler.

Currently we have:

  • Narrow dependency between RDD's - merge into the same stage.
  • Shuffle dependency - introduce a shuffle split (unless it is provable that we can convert it to narrow dependency).

With semantics around how to handle failures, etc.

Extend this to support real time shuffle as a first class support, and define :
a) Given a job, how it gets 'split' into stages and wire them based on real time shuffle dependency (when to split, when to combine within stage ? How it interacts with everything else ? For unsupported idioms - explicitly fail fast)
b) Which stages can be concurrently executed and which need to wait - define semantics around progress, starvation, etc.
c) What are the semantics around failures
d) How does this interact with existing constructs (for ex: if there is 'regular' shuffle dependency ? throw exception ? supported ?)

A lot of these are already in the current PR - we need to simply formalize them, and integrate into existing machinary.
This PR is good to test things out and validate ideas - but not for merging into Apache Spark itself

@jerrypeng

Copy link
Copy Markdown
Contributor Author

@mridulm thank you for the detailed feedback — I think we're aligned on the destination, and I'd like to propose reaching it incrementally.

I agree the end state is: these scheduling semantics supported by the default DAGScheduler, with richer, more fine-grained abstractions — e.g. annotating in the query plan which shuffles can be read incrementally, rather than an opt-in flag. My question is whether we can sequence it into milestones rather than land it all at once.

IMO this PR already declares clear semantics for the new scheduling capability, and they're fairly generic:

  1. The shuffle connecting two concurrent stages is read incrementally: the consumer reads from a still-running producer instead of waiting for fully materialized output.
  2. Because of that, stages with a data dependency can run concurrently rather than sequentially.
  3. Because that incremental shuffle is transient (its data can't be replayed), any task failure restarts the whole job.

None of these reference streaming — real-time mode is just the first caller, and the capability isn't streaming-specific: any feature that uses an incrementally-readable shuffle can opt into the same semantics. The PR gates them behind a streaming-named property for expedience (streaming.concurrent.stages.enabled) — happy to rename it to something more generic if you'd like.

I'd also note the DAGScheduler footprint is deliberately small: the base-class change is a no-op hook, a couple of accessors, and a few visibility relaxations, with the default execution path unchanged — precisely because I share your concern that changes there are high-risk. The new behavior is fully opt-in, so structuring it this way keeps the blast radius small: unrelated queries can't be affected by these changes. That's also what makes landing it incrementally low-risk.

Could we use this PR as the first milestone and merge it as-is? It would let us test and validate real-time mode end-to-end in-tree while we design the deeper integration. As an immediate follow-up, I will work through how to make these semantics more natively defined in the DAGScheduler.

What do you think?

@jerrypeng

Copy link
Copy Markdown
Contributor Author

@mridulm To give a sense of what I'm picturing for the more native design:
Entry point — a capability flag on the shuffle. Add a field to ShuffleExchangeExec:

  // true if this shuffle can be read incrementally
  // (the consumer can read from a still-running producer)
  incrementalHint: Boolean

It's set during physical planning (this is an execution concern, not a logical-plan one), flows into the ShuffleDependency the exchange creates, and is read by the DAGScheduler at stage-creation time. That single flag is the entry point that opts a shuffle edge into concurrent scheduling over an incremental shuffle (e.g. the "streaming shuffle" we're building for RTM).

How RTM sets it. A physical-planning rule for RTM streaming queries marks the query's shuffle exchanges with incrementalHint = true. Nothing here is streaming-specific: any feature can write its own physical-planning rule to opt a job into concurrent scheduling + incremental shuffles, so the capability is generic — RTM is just the first caller.

Semantics implied by an incremental shuffle:

  1. The stages on either side of the shuffle can run concurrently — the consumer reads from a still-running producer instead of waiting for fully materialized output.

  2. Because the incremental shuffle's data is transient (it can't be replayed), a task failure in either of the co-scheduled stages reruns them (for RTM, the query restarts from checkpoint).

A cleaner generalization — separate "incremental" from "persistent". Semantic (2) is really a consequence of a second, orthogonal property: whether the shuffle data is durable/replayable. We can make that explicit with a second flag:

  // true if the shuffle data is durable/replayable, so a single task
  // failure does not require rerunning the producer stage
  persistentHint: Boolean   // name TBD

That decouples two concerns:

  • incrementally-readable → enables concurrent scheduling.
  • persistent/replayable → determines failure recovery: a transient shuffle means rerun the co-scheduled stages; a durable shuffle allows normal fine-grained recovery (re-read from the persisted data / offset).

Though I would defer implementing the "persistentHint" capability until there is an actual use case / implementation.

The streaming shuffle is incremental and transient. But a Kafka-backed shuffle, say, could be incremental and persistent — concurrent scheduling without the rerun-everything-on-failure penalty, since the consumer can replay from an offset. Splitting the two flags lets the construct compose across those cases instead of hardcoding RTM's failure model.

Pluggable shuffle implementation via config. The engine maps the capability to a concrete ShuffleManager. Today we dispatch between sort and streaming managers on a per-job property; the cleaner version selects per-dependency from incrementalHint, with the implementations configured:

  spark.shuffle.manager             = org.apache.spark.shuffle.sort.SortShuffleManager
  spark.shuffle.manager.incremental = org.apache.spark.shuffle.streaming.StreamingShuffleManager

A shuffle with incrementalHint = true is served by the configured incremental manager; everything else by the default — keeping the scheduler construct generic while the shuffle implementation stays pluggable.

Let me know what you think though I would still prefer to do this incrementally like my previous comment.

@mridulm

mridulm commented Jun 10, 2026

Copy link
Copy Markdown
Contributor

Let me rephrase it - why do we want to add this workaround ? And not integrate it into DAGScheduler ?
Are we concerned about stability ? Incompleteness of implementation ? Side effects/interaction concerns ?

Any scheduler change suffers from potential risks - and I am trying to reason about why we are special casing here and introducing an entirely new dag scheduler.

If we want to make dag scheduler pluggable - that is a design in itself, and needs to be thought through - not as a derisking mechanism for a specific feature.

@jerrypeng

Copy link
Copy Markdown
Contributor Author

@mridulm the implementation presented in this PR is not really a workaround — it is a working solution for the needs of RTM.

I agree that we can eventually design a more natively integrated solution that provides more generic functionality. However, can we approach that incrementally?

My philosophy for software development is iterative. I would like to first introduce something that works for the RTM use case, while minimizing risk to existing Spark use cases. That is what this PR is trying to do. The changes are intentionally scoped so that we can test RTM end-to-end without requiring a larger DAGScheduler redesign upfront.

I would rather get something working first, validate it end-to-end, and then iteratively refine the abstractions. Building a more generic framework may take time, and I am happy to work toward that, but I do not think we should block RTM progress on having the fully generalized design in place from day one.

Let me know what you think. Regardless I am going to look into how to natively built this functionality in the DAGScheduler.

@mridulm

mridulm commented Jun 10, 2026

Copy link
Copy Markdown
Contributor

As I said before, I would suggest working towards integrating into DAGScheduler with a principled solution; rather than trying to introduce extension mechanisms just to derisk the work : that is a feasible approach for a prototype.

If we want to make DAGScheduler pluggable - that could be a design in itself - and should be approached as such. To be honest, there is a case to be made for that given the complexity.

I am -1 on this specific direction for the change until then.

+CC @jiangxb1987 as you reviewed this PR as well.

@jerrypeng

jerrypeng commented Jun 23, 2026

Copy link
Copy Markdown
Contributor Author

@mridulm what do you think about the approach I described here:

#56055 (comment)

@cloud-fan

Copy link
Copy Markdown
Contributor

Joining late — I've read the thread and I'm with @mridulm on direction: this should be a principled construct inside DAGScheduler, not a separate scheduler picked by config (pluggable DAGScheduler is its own design problem, not a derisking mechanism for one feature). But I think @jerrypeng's latest sketch (comment) is most of the way there, and this PR is a useful prototype for nailing down the semantics. Let me try to converge them.

The new ability is cross-stage gang scheduling, and that already implies a streaming shuffle. Co-scheduling two data-dependent stages is only useful if the edge is readable before the producer finishes — i.e. a streaming shuffle. So "run these stages concurrently" and "the shuffle is incremental" aren't orthogonal; they're one decision seen from two sides. The incrementalHint/persistentHint split models a four-state space where only one cell is real today (incremental + transient = the streaming shuffle), and persistentHint has no implementation behind it — I'd drop it until a persistent-incremental shuffle (e.g. Kafka-backed) actually needs it.

One nuance on "barrier": what's barrier-like is the resource side — the co-scheduled stages must all get slots at once or fail fast, which is exactly what this PR's CONCURRENT_SCHEDULER_INSUFFICIENT_SLOT reimplements on top of the existing checkBarrierStageWithNumSlots. What's not barrier-like is execution — backpressure handles readiness, no sync point. So it's gang-scheduled stages + pipelined execution. (I'd avoid the word "barrier" in the API since it already means intra-stage MPI scheduling, but the lineage — generalizing barrier's gang-resource model from tasks to a stage set — is worth stating.)

Concretely, I'd express @mridulm's (a)–(d) as one first-class dependency kind rather than two hint flags: a pipelined shuffle dependency, peer to narrow/shuffle deps, set in physical planning and carried into the ShuffleDependency.

Add: the marker on ShuffleDependency; treat a pipelined edge as non-sequencing in stage creation so the connected component forms one co-scheduled group (a); submit the group together through a generalized barrier slot check (b); a group-failure policy where any failure reruns the group (c); fail-fast where a regular shuffle dependency crosses the group boundary (d).

Remove: ConcurrentStageDAGScheduler, spark.scheduler.dagSchedulerType, and the per-job streaming.concurrent.stages.enabled property (replaced by the marker); CONCURRENT_SCHEDULER_INSUFFICIENT_SLOT + realtimeModeSlotsCheck.disabled (subsumed by the slot check); and re-key the TaskSchedulerImpl/TaskSetManager failure changes on group membership rather than the streaming property. RTM then becomes a pure consumer — a physical-planning rule marks its exchanges pipelined, no streaming-specific code in the scheduler.

On out-of-order completion: with stages concurrent, the result stage's tasks can finish while parents run, and the base Success path would declare the job done and cancel the still-running parents. The current PR buffers a stage's completion events until its parents finish, then replays them. I'd keep that approach — the all-or-nothing group-failure model means we never need partial-progress correctness inside a group (the only thing a native rewrite would buy), and the buffer keeps the batch-critical completion paths untouched while naturally holding a finished-but-still-pipelining stage in runningStages. Just make the group the first-class concept — an explicit group-completion contract implemented via the deferral — so it's a documented mechanism, not a free-floating shim. Worth a test for the replay window (a failure between parent-finish and replay → group reruns, buffer dropped).

Sequencing: this still gets us there incrementally, just cut differently — land the dependency type + group scheduling + group failure as the generic milestone with a non-streaming test, then add the RTM rule + streaming shuffle on top. The scheduler primitive is the hard-to-revise part, so I'd rather get its shape right in-tree first with RTM as the validating consumer than merge the subclass and re-cut it later. Happy to help review the dependency-type design.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants